SEED=5469
BASE_DIR='./traffic-signs-data'
SIGNNAMES_DIR = BASE_DIR + '/signnames/'
OUT_DIR ='./traffic-signs-classification'
# Standard libs
import pickle
import csv
from timeit import default_timer as timer
import os
import sys
#Visualisation
%matplotlib inline
from tqdm import tqdm_notebook
import matplotlib.pyplot as plt
from IPython.display import Image
from IPython.display import display
# numerical libs
import cv2
import math
import random
import numpy as np
random.seed(SEED)
np.random.seed(SEED)
import tensorflow as tf
tf.set_random_seed(SEED)
from tensorflow.python.training import moving_averages
from tensorflow.contrib.framework import add_model_variable
sess = tf.InteractiveSession()
progressbar_width = '500'
The pickled data is a dictionary with 4 key/value pairs:
'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.'sizes' is a list containing tuples, (width, height) representing the original width and height the image.'coords' is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. These coordinates assume the original image. The pickled data conatins the resized versions (32x32) of these images.# Load pickled dataset
def load_data():
training_data = BASE_DIR + '/train.p'
testing_data = BASE_DIR + '/test.p'
classname_data = BASE_DIR + '/signnames.csv'
classnames = []
with open(classname_data) as _f:
rows = csv.reader(_f, delimiter=',')
next(rows, None) # skip the headers
for i, row in enumerate(rows):
assert(i==int(row[0]))
classnames.append(row[1])
with open(training_data, mode='rb') as f:
train = pickle.load(f)
with open(testing_data, mode='rb') as f:
test = pickle.load(f)
X_train, y_train = train['features'], train['labels']
X_test, y_test = test['features'], test['labels']
X_train = X_train.astype(np.float32)
y_train = y_train.astype(np.int32)
X_test = X_test.astype(np.float32)
y_test = y_test.astype(np.int32)
return classnames, X_train, y_train, X_test, y_test
Here a basic summary of the data set is presented:
### Replace each question mark with the appropriate value.
classnames, X_train, y_train, X_test, y_test = load_data()
# Number of training examples
num_train = len(X_train)
# Number of testing examples.
num_test = len(X_test)
# Shape of an traffic sign image
_, height, width, channel = X_train.shape
image_shape = (height, width, channel)
# Number of unique classes/labels in the dataset
num_class = len(np.unique(y_train))
num_total = num_train + num_test
print("Number of training examples =", num_train )
print("Number of testing examples =", num_test )
print("Image data shape =", image_shape)
print("Number of classes =", num_class)
Here, the German Traffic Signs Dataset have been visualized using the pickled dataset available from the course. Below, the histrogram analysis of the different training and testing set images have been prepared for all the 43 classes of traffic signs present in the pickled dataset.
### Data exploration visualization goes here.
# Helper function to draw graph,etc
def get_image_label(c):
img=cv2.imread(SIGNNAMES_DIR + str(c) + '.jpg',1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
label_image = cv2.resize(img, (32,32))
return label_image
def insert_subimage(image, sub_image, y, x):
h, w, c = sub_image.shape
image[y:y+h, x:x+w, :]=sub_image
return image
def display_dataset(images, labels, dataset_type):
data_images, data_labels = images, labels
#results image
num_sample=15
results_image = 255.*np.ones(shape=((num_class+2)*height,(num_sample+2+22)*width, channel),dtype=np.float32)
cv2.putText(results_image, "Traffic signs class and label", (0, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(0,0,255),2)
cv2.putText(results_image, "Traffic signs " + dataset_type + "ing dataset", (width+520, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(255,0,0),2)
cv2.putText(results_image, "Class Frequency", ((2+num_sample+15)*width, height), cv2.FONT_HERSHEY_SIMPLEX,0.75,(0,0,255),2)
cv2.line(results_image,(0,height+10),(results_image.shape[0],height+10),(0,0,127),1)
for c in tqdm_notebook(range(num_class), desc="Loading " + dataset_type + "ing"):
label_image = get_image_label(c)
insert_subimage(results_image, label_image, (c+2)*height, 450)
# Calculate the mean image parameters
idx = list(np.where(data_labels== c)[0])
mean_image = np.average(data_images[idx], axis=0)
insert_subimage(results_image, mean_image, (c+2)*height, width+460)
# Make random samples
for n in range(num_sample):
sample_image = data_images[np.random.choice(idx)]
insert_subimage(results_image, sample_image, (c+2)*height, (2+n)*width+460)
# Dataset summary
count=len(idx)
percentage = float(count)/float(num_total)
cv2.putText(results_image, '%02d:%-6s'%(c, classnames[c]), (0, int(((c+2)+0.7)*height)),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,0),1)
cv2.putText(results_image, '[%4d]'%(count), ((2+num_sample+15)*width, int(((c+2)+0.7)*height)),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,200,100),1)
cv2.rectangle(results_image,((2+num_sample+17)*width, (c+2)*height),((2+num_sample+17)*width + round(percentage*3000), ((c+2)+1)*height),(127*(c%2),0,255),-1)
cv2.imwrite(BASE_DIR+'/data_' + dataset_type + '.jpg',cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB))
plt.rcParams["figure.figsize"] = (25,25)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off')
plt.show()
display_dataset (X_train, y_train, "train")
from time import sleep
sleep(5) # Time in seconds.
display_dataset (X_test, y_test, "test")
Here I focus on designing and implementing a deep learning model that learns to recognize traffic signs. Training and testing of the model is done on the German Traffic Sign Dataset.
The training data is initially splittted into training and validation set using a random seed. Next,the dataset is expanded and augmented with fake data by different techniques:
Also, it is made sure that each class get similar number of training data. It was also made sure that after flipping and other perturbation if the output of the original image belonged to some other class, then it was labelled accordingly.
# split into train and validation.
def split_data(X_train, y_train, num_valid=3000):
num = len(y_train) # ~40000
index = list(range(num))
random.shuffle(index)
train_index=index[num_valid:]
valid_index=index[:num_valid]
train_images = X_train[train_index]
train_labels = y_train[train_index]
valid_images = X_train[valid_index]
valid_labels = y_train[valid_index]
return train_images, train_labels, valid_images, valid_labels
# This expands the train data by flipping.
# Note: this code is from : http://navoshta.com/traffic-signs-classification/
def extend_data_by_flipping(images, labels):
X=images
y=labels
# Classes of signs that, when flipped horizontally, should still be classified as the same class
self_flippable_horizontally = np.array([11, 12, 13, 15, 17, 18, 22, 26, 30, 35])
# Classes of signs that, when flipped vertically, should still be classified as the same class
self_flippable_vertically = np.array([1, 5, 12, 15, 17])
# Classes of signs that, when flipped horizontally and then vertically, should still be classified as the same class
self_flippable_both = np.array([32, 40])
# Classes of signs that, when flipped horizontally, would still be meaningful, but should be classified as some other class
cross_flippable = np.array([
[19, 20],
[33, 34],
[36, 37],
[38, 39],
[20, 19],
[34, 33],
[37, 36],
[39, 38],
])
num_classes = 43
X_extended = np.empty([0, X.shape[1], X.shape[2], X.shape[3]], dtype=np.float32)
y_extended = np.empty([0], dtype=np.int32)
for c in tqdm_notebook(range(num_classes), desc="Flipping", ncols=progressbar_width):
# First copy existing data for this class
X_extended = np.append(X_extended, X[y == c], axis=0)
# If we can flip images of this class horizontally and they would still belong to said class...
if c in self_flippable_horizontally:
# ...Copy their flipped versions into extended array.
X_extended = np.append(X_extended, X[y == c][:, :, ::-1, :], axis=0)
# If we can flip images of this class horizontally and they would belong to other class...
if c in cross_flippable[:, 0]:
# ...Copy flipped images of that other class to the extended array.
flip_class = cross_flippable[cross_flippable[:, 0] == c][0][1]
X_extended = np.append(X_extended, X[y == flip_class][:, :, ::-1, :], axis=0)
# Fill labels for added images set to current class.
y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))
# If we can flip images of this class vertically and they would still belong to said class...
if c in self_flippable_vertically:
# ...Copy their flipped versions into extended array.
X_extended = np.append(X_extended, X_extended[y_extended == c][:, ::-1, :, :], axis=0)
# Fill labels for added images set to current class.
y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))
# If we can flip images of this class horizontally AND vertically and they would still belong to said class...
if c in self_flippable_both:
# ...Copy their flipped versions into extended array.
X_extended = np.append(X_extended, X_extended[y_extended == c][:, ::-1, ::-1, :], axis=0)
# Fill labels for added images set to current class.
y_extended = np.append(y_extended, np.full((X_extended.shape[0] - y_extended.shape[0]), c, dtype=np.int32))
extend_datas = X_extended
extend_labels = y_extended
return (extend_datas, extend_labels)
# use opencv to do data agumentation
def perturb(image, keep, angle_limit=15, scale_limit=0.1, translate_limit=3, distort_limit=3, illumin_limit=0.7):
u=np.random.uniform()
if u>keep :
(W, H, C) = image.shape
center = np.array([W / 2., H / 2.])
da = np.random.uniform(low=-1, high=1) * angle_limit/180. * math.pi
scale = np.random.uniform(low=-1, high=1) * scale_limit + 1
cc = scale*math.cos(da)
ss = scale*math.sin(da)
rotation = np.array([[cc, ss],[-ss,cc]])
translation = np.random.uniform(low=-1, high=1, size=(1,2)) * translate_limit
distort = np.random.standard_normal(size=(4,2)) * distort_limit
pts1 = np.array([[0., 0.], [0., H], [W, H], [W, 0.]])
pts2 = np.matmul(pts1-center, rotation) + center + translation
#add perspective noise
pts2 = pts2 + distort
matrix = cv2.getPerspectiveTransform(pts1.astype(np.float32), pts2.astype(np.float32))
perturb = cv2.warpPerspective(image, matrix, (W, H), flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REFLECT_101) # BORDER_WRAP #BORDER_REFLECT_101 #cv2.BORDER_CONSTANT BORDER_REPLICATE
# Add brightness, contrast, saturation
if 1: #brightness
alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
perturb *= alpha
perturb = np.clip(perturb,0.,255.)
pass
if 1: #contrast
coef = np.array([[[0.299, 0.587, 0.114]]]) #rgb to gray (YCbCr) : Y = 0.299R + 0.587G + 0.114B
alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
gray = perturb * coef
gray = (3.0 * (1.0 - alpha) / gray.size) * np.sum(gray)
perturb *= alpha
perturb += gray
perturb = np.clip(perturb,0.,255.)
pass
if 1: #saturation
coef = np.array([[[0.299, 0.587, 0.114]]]) #rgb to gray (YCbCr) : Y = 0.299R + 0.587G + 0.114B
alpha = 1.0 + illumin_limit*random.uniform(-1, 1)
gray = perturb * coef
gray = np.sum(gray, axis=2, keepdims=True)
gray *= (1.0 - alpha)
perturb *= alpha
perturb += gray
perturb = np.clip(perturb,0.,255.)
pass
return perturb
else:
return image
def make_perturb_images(images, keep ):
arguments = np.zeros(images.shape)
for n in tqdm_notebook(range(len(images)), desc="Perturbing", ncols=progressbar_width):
arguments[n] = perturb(images[n],keep = keep)
return arguments
# sample and shuffle the data such that each class has equal number of samples for training
def shuffle_data_uniform(datas, labels, num_class, num_per_class=None):
if num_per_class is None:
max_count = 0
for c in range(num_class):
idx = list(np.where(labels == c)[0])
count = len(idx)
max_count = max(count, max_count)
num_per_class = max_count
index = []
for c in range(num_class):
idx = list(np.where(labels == c)[0])
index = index + list(np.random.choice(idx, num_per_class))
random.shuffle(index)
shuffle_datas = datas[index]
shuffle_labels = labels[index]
return shuffle_datas, shuffle_labels
#generate next batch for sdg
def generate_train_batch_next(datas, labels, n, batch_size):
i = n*batch_size
batch_datas = datas [i:i+batch_size]
batch_labels = labels[i:i+batch_size]
return batch_datas, batch_labels
#prepare all data here
classnames, X_train, y_train, X_test, y_test = load_data()
train_images, train_labels, valid_images, valid_labels = split_data(X_train, y_train)
test_images, test_labels = X_test, y_test
num_train = len(train_images)
num_valid = len(valid_images)
num_test = len(test_images)
print('** Dataset details **')
print('Height, width, channel = %d, %d, %d'%(height, width, channel))
print('Number of test set = %d'%num_test)
print('Number of validation set = %d'%num_valid)
print('Number of training set = %d'%num_train)
#train data filpping.
train_images, train_labels = extend_data_by_flipping(train_images, train_labels)
num_train_flip = len(train_images)
print('')
print('Number of training set(after flip)= %d' % num_train_flip )
#train data augmentation
keep = 0.20 # 0.50 0.25 0.20 0.15
num_per_class = 1000
num_augmented = num_per_class*num_class
augmented_images, augmented_labels = shuffle_data_uniform(train_images, train_labels, num_class, num_per_class=num_per_class)
augmented_images = make_perturb_images(augmented_images, keep=keep)
num_augmented = len(augmented_images)
print('Number of augmented images = %d' % num_augmented)
# Newly generated training data
print('\n')
print('Examples of augmented images (First column is the orginal image)')
# results image
num_sample = 20
perturbance_per_sample = 20
results_image = 255. * np.ones(shape=(num_sample * height, (perturbance_per_sample+1)* width+10, channel),dtype=np.float32)
for j in tqdm_notebook(range(num_sample), desc="Loading display", ncols=progressbar_width):
i = random.randint(0, num_train_flip - 1)
image = train_images[i]
insert_subimage(results_image, image, j * height, 0)
for k in range(0, perturbance_per_sample):
perturb_image = perturb(image, keep=0)
insert_subimage(results_image, perturb_image, j*height, (k+1)*width+10)
cv2.imwrite(BASE_DIR+'/data_augmented.jpg',cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB))
plt.rcParams["figure.figsize"] = (25,25)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off')
plt.show()
The dense block from the paper "Densely Connected Convolutional Networks" - Gao Huang, Zhuang Liu, Kilian Q. Weinberger, Laurens van der Maaten, Arxiv 2016, was used.
Reuse and some modification(s):
### Define your architecture here.
## global varaiables ##
IS_TRAIN_PHASE = tf.placeholder(dtype=tf.bool, name='is_train_phase')
def conv2d(x, num_kernels=1, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, name='conv'):
input_shape = x.get_shape().as_list()
assert len(input_shape)==4
C = input_shape[3]
H = kernel_size[0]
W = kernel_size[1]
K = num_kernels
##[filter_height, filter_width, in_channels, out_channels]
w = tf.get_variable(name=name+'_weight', shape=[H, W, C, K], initializer=tf.truncated_normal_initializer(stddev=0.1))
conv = tf.nn.conv2d(x, w, strides=stride, padding=padding, name=name)
if has_bias:
b = tf.get_variable(name=name + '_bias', shape=[K], initializer=tf.constant_initializer(0.0))
conv = conv+b
return conv
def relu(x, name='relu'):
act = tf.nn.relu(x, name=name)
return act
def prelu(x, name='prelu'):
alpha = tf.get_variable(name=name+'_alpha', shape=x.get_shape()[-1],
initializer=tf.random_uniform_initializer(minval=0.1, maxval=0.3),
dtype=tf.float32)
pos = tf.nn.relu(x)
neg = alpha * (x - abs(x)) * 0.5
return pos + neg
# very leaky relu
#def vlrelu(x, alpha=0.25, name='vlrelu'): # alpha between 0.1 to 0.5
# act =tf.maximum(alpha*x,x)
# return act
def maxpool(x, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, name='max' ):
H = kernel_size[0]
W = kernel_size[1]
pool = tf.nn.max_pool(x, ksize=[1, H, W, 1], strides=stride, padding=padding, name=name)
return pool
def avgpool(x, kernel_size=(1,1), stride=[1,1,1,1], padding='SAME', has_bias=True, is_global_pool=False, name='avg'):
if is_global_pool==True:
input_shape = x.get_shape().as_list()
assert len(input_shape) == 4
H = input_shape[1]
W = input_shape[2]
pool = tf.nn.avg_pool(x, ksize=[1, H, W, 1], strides=[1,H,W,1], padding='VALID', name=name)
pool = flatten(pool)
else:
H = kernel_size[0]
W = kernel_size[1]
pool = tf.nn.avg_pool(x, ksize=[1, H, W, 1], strides=stride, padding=padding, name=name)
return pool
def dropout(x, keep=1.0, name='drop'):
#drop = tf.cond(IS_TRAIN_PHASE, lambda: tf.nn.dropout(input, keep), lambda: x)
drop = tf.cond(IS_TRAIN_PHASE,
lambda: tf.nn.dropout(x, keep),
lambda: tf.nn.dropout(x, 1))
return drop
def flatten(x, name='flat'):
input_shape = x.get_shape().as_list() # list: [None, 9, 2]
dim = np.prod(input_shape[1:]) # dim = prod(9,2) = 18
flat = tf.reshape(x, [-1, dim], name=name) # -1 means "all"
return flat
def concat(x, name='cat'):
cat = tf.concat(concat_dim=3, values=x, name=name)
return cat
def bn (x, decay=0.9, eps=1e-5, name='bn'):
with tf.variable_scope(name) as scope:
bn = tf.cond(IS_TRAIN_PHASE,
lambda: tf.contrib.layers.batch_norm(x, decay=decay, epsilon=eps, center=True, scale=True,
is_training=1,reuse=None,
updates_collections=None, scope=scope),
lambda: tf.contrib.layers.batch_norm(x, decay=decay, epsilon=eps, center=True, scale=True,
is_training=0, reuse=True,
updates_collections=None, scope=scope))
return bn
# basic building blocks
def bn_relu_conv2d (x, num_kernels=1, kernel_size=(1, 1), stride=[1, 1, 1, 1], padding='SAME', name='conv'):
with tf.variable_scope(name) as scope:
block = bn(x)
block = relu(block)
block = conv2d(block, num_kernels=num_kernels, kernel_size=kernel_size, stride=stride, padding=padding, has_bias=False)
return block
def dense_block_cbr (x, num=1, num_kernels=1, kernel_size=(1, 1), drop=None, name='DENSE'):
block = x
for n in range(num):
with tf.variable_scope(name+'_%d'%n) as scope:
conv = conv2d(block, num_kernels=num_kernels, kernel_size=kernel_size, stride=[1,1,1,1], padding='SAME', has_bias=False)
conv = bn(conv)
conv = relu(conv)
if drop is not None:
keep = (1 - drop) ** (1. / num)
conv = dropout(conv, keep=keep)
block = concat((block, conv))
return block
# the loss
def l2_regulariser(decay):
variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
for v in variables:
name = v.name
if 'weight' in name: #this is weight
l2 = decay * tf.nn.l2_loss(v)
tf.add_to_collection('losses', l2)
elif 'bias' in name: #this is bias
pass
elif 'beta' in name:
pass
elif 'gamma' in name:
pass
elif 'moving_mean' in name:
pass
elif 'moving_variance' in name:
pass
elif 'moments' in name:
pass
else:
#pass
#raise Exception('unknown variable type: %s ?'%name)
pass
l2_loss = tf.add_n(tf.get_collection('losses'))
return l2_loss
def cross_entropy(logit, label, name='cross_entropy'):
label = tf.cast(label, tf.int64)
cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logit, labels=label), name=name)
return cross_entropy
def accuracy(prob, label, name='accuracy'):
correct_prediction = tf.equal(tf.argmax(prob, 1), tf.cast(label, tf.int64))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name=name)
return accuracy
# The densenet network with the inference part (without loss)
def DenseNet( input_shape=(1,1,1), output_shape = (1)):
H, W, C = input_shape
num_class = output_shape
x = tf.placeholder(shape=[None, H, W, C], dtype=tf.float32, name='x')
#color preprocessing using conv net:
#see "Systematic evaluation of CNN advances on the ImageNet"-Dmytro Mishkin, Nikolay Sergievskiy, Jiri Matas, ARXIV 2016
# https://arxiv.org/abs/1606.02228
# we use learnable prelu (different from paper) and 3x3 onv
with tf.variable_scope('preprocess') as scope:
x = bn(x, name='b1')
x = conv2d(x, num_kernels=8, kernel_size=(3, 3), stride=[1, 1, 1, 1], padding='SAME', has_bias=True, name='c1')
x = prelu(x, name='r1')
x = conv2d(x, num_kernels=8, kernel_size=(1, 1), stride=[1, 1, 1, 1], padding='SAME', has_bias=True, name='c2')
x = prelu(x, name='r2')
with tf.variable_scope('block1') as scope:
block1 = bn_relu_conv2d(x, num_kernels=32, kernel_size=(5, 5), stride=[1, 1, 1, 1], padding='SAME')
block1 = maxpool(block1, kernel_size=(2,2), stride=[1, 2, 2, 1], padding='SAME')
# dropout is taken out of the block
with tf.variable_scope('block2') as scope:
block2 = dense_block_cbr(block1, num=4, num_kernels=16, kernel_size=(3, 3), drop=None)
block2 = maxpool(block2, kernel_size=(2, 2), stride=[1, 2, 2, 1], padding='SAME')
with tf.variable_scope('block3') as scope:
block3 = dense_block_cbr(block2, num=4, num_kernels=24, kernel_size=(3, 3), drop=None)
block3 = dropout(block3, keep=0.9)
block3 = maxpool(block3, kernel_size=(2,2), stride=[1, 2, 2, 1], padding='SAME')
with tf.variable_scope('block4') as scope:
block4 = dense_block_cbr(block3, num=4, num_kernels=32, kernel_size=(3, 3), drop=None)
block4 = bn_relu_conv2d(block4, num_kernels=num_class, kernel_size=(1,1), stride=[1, 1, 1, 1], padding='SAME')
block4 = dropout(block4, keep=0.8)
block4 = avgpool(block4, is_global_pool=True)
return block1, block2, block3, block4
# construct the graph here
block1, block2, block3, logit = DenseNet(input_shape =(height, width, channel), output_shape=(num_class))
data = tf.get_default_graph().get_tensor_by_name('x:0')
label = tf.placeholder(dtype=tf.int32, shape=[None])
prob = tf.nn.softmax(logit)
l2 = l2_regulariser(decay=0.0005)
loss = cross_entropy(logit, label)
metric = accuracy(prob, label)
A validation set can be used to assess how well the model is performing. A low accuracy on the training and validation sets imply underfitting. A high accuracy on the training set but low accuracy on the validation set implies overfitting.
### Train your model here.
### Feel free to use as many code cells as needed.
# changing of learning rate
def schdule_by_step( r, steps=(0,100), items=(0.1,0.01)):
item = items[0]
N=len(steps)
for n in range(N):
if r >= steps[n]:
item = items[n]
return item
#for testing and validation
def test_net( datas, labels, batch_size, data, label, loss, metric, sess):
num = len(datas)
all_loss = 0
all_acc = 0
all = 0
for n in range(0, num, batch_size):
#print('\r evaluating .... %d/%d' % (n, num), end='', flush=True)
start = n
end = start+batch_size if start+batch_size<=num else num
batch_datas = datas [start:end]
batch_labels = labels [start:end]
fd = {data: batch_datas, label: batch_labels, IS_TRAIN_PHASE : False}
test_loss, test_acc = sess.run([loss, metric], feed_dict=fd)
a = end-start
all += a
all_loss += a*test_loss
all_acc += a*test_acc
assert(all==num)
loss = all_loss/all
acc = all_acc/all
return loss, acc
#solver
epoch_log = 2
max_run = 8
batch_size = 128 #128 #256 384 #128
steps = (0, 3, 6, 8)
rates = (0.1, 0.01, 0.001, 0.0001)
learning_rate = tf.placeholder(tf.float32, shape=[])
solver = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)
solver_step = solver.minimize(loss+l2)
# start training here ------------------------------------------------
print ('start training')
sess.run(tf.global_variables_initializer(), feed_dict = {IS_TRAIN_PHASE : True } )
saver = tf.train.Saver()
writer = tf.summary.FileWriter(OUT_DIR + '/tf', graph=tf.get_default_graph())
# keep a log
print('')
print(' run epoch iter rate | train_loss (acc) | valid_loss (acc) | time ')
print('----------------------------------------------------------------------------------------------')
tic = timer()
iter = 0
for r in range(max_run):
rate = schdule_by_step(r, steps=steps, items=rates)
argument_images, argument_labels = shuffle_data_uniform(train_images, train_labels, num_class, num_per_class=num_per_class)
argument_images = make_perturb_images(argument_images, keep=keep)
num_argument = len(argument_images)
N = max(num_argument//batch_size-1,1)
#iter_log = round(float(num_train) / float(num_argument) * float(N))
iter_log = max(round(float( epoch_log *num_train ) / float(batch_size)),1)
for n in tqdm_notebook(range(N), desc="Training"):
iter = iter + 1
run = r + float(n)/float(N)
epoch = float(iter*batch_size)/float(num_train)
batch_datas, batch_labels = generate_train_batch_next( argument_images, argument_labels, n, batch_size )
fd = {data: batch_datas, label: batch_labels, learning_rate: rate, IS_TRAIN_PHASE : True }
_, batch_loss, batch_acc, = sess.run([solver_step, loss, metric ],feed_dict=fd)
print('\r%4.1f %5.1f %05d %f | %f (%f) ' %
(run, epoch, iter, rate, batch_loss, batch_acc), end='', flush=True)
#do validation here!
if iter%iter_log==0 or (r==max_run-1 and n==N-1):
toc = timer()
sec_pass = toc - tic
min_pass = sec_pass/60.
#validation
val_loss, val_acc = test_net(valid_images, valid_labels, batch_size, data, label, loss, metric, sess)
#print('\r')
print('\r%4.1f %5.1f %05d %f | %f (%f) | %f (%f) | %4.1f min' %
(run, epoch, iter, rate, batch_loss, batch_acc, val_loss, val_acc, min_pass ), end='\n',flush=True)
pass
# save intermediate checkpoint
# saver.save(sess, out_dir + '/check_points/%06d.ckpt'%r) #iter
#final test! ------------------------------------------
# save final checkpoint
os.makedirs (OUT_DIR + '/check_points/final.ckpt',exist_ok=True)
saver.save(sess, OUT_DIR + '/check_points/final.ckpt')
print('\n')
print('** evaluation on test set **' )
test_loss, test_acc = test_net(test_images, test_labels, batch_size, data, label, loss, metric, sess)
print('test_loss=%f (test_acc=%f)' % ( test_loss, test_acc))
To give yourself more insight into how your model is working, download at least five pictures of German traffic signs from the web and use your model to predict the traffic sign type.
You may find signnames.csv useful as it contains mappings from the class id (integer) to the actual sign name.
test_files=['0002.jpg', #normal
'0000.jpg', #normal
'0004.jpg', #occluded with snow
'0006.jpg', #small
'0001.jpg', #not in class
]
test_rois =[(190,135,405,330),(170,430,207,469),(1120,520,1650,1290),(226,65,242,78 ),(1370,280,1460,400)]
test_label=[17,38,14,40,25]
num=len(test_files)
# crop roi to 32x32
results_image = 255. * np.ones(shape=(1 * height, num* width, channel),dtype=np.float32)
results_image1 = 255. * np.ones(shape=(1 * 320, num* 320, channel),dtype=np.float32)
crops = np.zeros(shape=(num,height,width,channel),dtype=np.float32)
for n in range(num):
img = cv2.imread(BASE_DIR+'/extra/' + test_files[n], 1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
x1, y1, x2, y2 = test_rois[n]
crop = cv2.resize(img[y1:y2, x1:x2, :], (0, 0), fx=32. / (x2 - x1), fy=32. / (y2 - y1),
interpolation=cv2.INTER_CUBIC)
crop = np.clip(crop,0,255)
crops[n]=crop
insert_subimage(results_image, crop, 0, n*width)
#mak roi and show
H,W,C=img.shape
S=max(H,W)
f=320./S
norm_img = cv2.resize(img, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_CUBIC)
cv2.rectangle(norm_img, (round(f*x1), round(f*y1)), (round(f*x2), round(f*y2)), (255,255,0), 3)
insert_subimage(results_image1, norm_img, 0, n*320)
#cv2.imshow('crop', crop)
#cv2.imshow('img', img)
#cv2.waitKey(0)
cv2.imwrite(OUT_DIR+'/extra_crops.jpg', cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB) )
cv2.imwrite(OUT_DIR+'/extra_marked.jpg', cv2.cvtColor(results_image1, cv2.COLOR_BGR2RGB) )
plt.rcParams["figure.figsize"] = (25,25)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off')
plt.show()
plt.imshow(results_image1.astype(np.uint8))
plt.axis('off')
plt.show()
new_images = crops
# load trained classifier
saver = tf.train.Saver()
saver.restore(sess, OUT_DIR + '/check_points/final.ckpt')
print('** test on extra **')
fd = {data: new_images, IS_TRAIN_PHASE: False}
test_prob = sess.run(prob, feed_dict=fd)
print('see printout of results in the next jupyter cell!')
print('success')
### Calculate the accuracy for these 5 new images.
### For example, if the model predicted 1 out of 5 signs correctly, it's 20% accurate on these new images.
### Visualize the softmax probabilities here.
#show results
f=10
results_image = 255. * np.ones(shape=(5*(f*height + f*8), 6*f*width, channel), dtype=np.float32)
for n in range(num):
crop = crops[n]
c_hat=test_label[n]
c_hat_label = classnames[c_hat] if c_hat>=0 else 'NIL'
print('n=%d: true = %02d:%s' % (n, c_hat,c_hat_label))
#crop = cv2.resize(crop, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_NN)
crop = crop.repeat(f, axis=0).repeat(f, axis=1)
insert_subimage(results_image, crop, n * (f*height + f*8), 0)
cv2.putText(results_image, '%02d:%s%s' % (c_hat,c_hat_label[0:15], '...' if len(classnames[c_hat])>15 else ''),
(5, (n+1) * (f*height + f*8)-50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
p = test_prob[n]
idx = np.argsort(p)[::-1]
for k in range(5):
c = int(idx[k])
label_image = get_image_label(c)
#label_image = cv2.resize(label_image, (0, 0), fx=f, fy=f, interpolation=cv2.INTER_NN)
label_image = label_image.repeat(f, axis=0).repeat(f, axis=1)
insert_subimage(results_image, label_image, n * (f*height + f*8), (k + 1) * f*width)
print('\ttop%d: %f %02d:%s' % (k, p[c], c, classnames[c]))
cv2.putText(results_image, '%02d:%s%s' % (c, classnames[c][0:15], '...' if len(classnames[c])>15 else ''),
(5+(k + 1) * f*width, (n+1) * (f*height + f*8)-50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
cv2.putText(results_image, 'top%d: %f' % (k, p[c]),
(5+(k + 1) * f*width, (n+1) * (f*height + f*8)-20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
print('')
print('')
print('')
print('**visual results**: X, followed by top-5')
cv2.imwrite(OUT_DIR+'/extra_predictions.jpg', cv2.cvtColor(results_image, cv2.COLOR_BGR2RGB) )
plt.rcParams["figure.figsize"] = (30,30)
plt.imshow(results_image.astype(np.uint8))
plt.axis('off')
plt.show()
This Section is not required to complete but acts as an additional excersise for understaning the output of a neural network's weights. While neural networks can be a great learning device they are often referred to as a black box. We can understand what the weights of a neural network look like better by plotting their feature maps. After successfully training the neural network we can see what it's feature maps look like by plotting the output of the network's weight layers in response to a test stimuli image. From these plotted feature maps, it's possible to see what characteristics of an image the network finds interesting. For a sign, maybe the inner network feature maps react with high activation to the sign's boundary outline or to the contrast in the sign's painted symbol.
from tensorflow.python.tools.inspect_checkpoint import print_tensors_in_checkpoint_file
### Visualize feature maps based on the activations functions
# image_input: the test image being fed into the network to produce the feature maps
# tf_activation: should be a tf variable name used during your training procedure that represents the calculated state of a specific weight layer
# activation_min/max: can be used to view the activation contrast in more detail, by default matplot sets min and max to the actual min and max values of the output
# plt_num: used to plot out multiple different weight feature map sets on the same block, just extend the plt number for each new feature map entry
def outputFeatureMap(image_input, tf_activation, activation_min=-1, activation_max=-1, plt_num=1):
# Here make sure to preprocess your image_input in a way your network expects
# with size, normalization, ect if needed
# image_input =
# Note: x should be the same name as your network's tensorflow data placeholder variable
# If you get an error tf_activation is not defined it may be having trouble accessing the variable from inside a function
activation = tf_activation.eval(session=sess,feed_dict={data : image_input, IS_TRAIN_PHASE: 0})
featuremaps = activation.shape[3]
fig = plt.figure(plt_num, figsize=(15,8*(featuremaps//32)))
for featuremap in tqdm_notebook(range(featuremaps), desc = 'Preparing viuslaizer'):
plt.subplot(math.ceil(featuremaps/8),8, featuremap+1) # sets the number of feature maps to show on each row and column
# displays the feature map number
plt.title('FeatureMap ' + str(featuremap))
if activation_min != -1 & activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin =activation_min, vmax=activation_max, cmap="gray")
elif activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmax=activation_max, cmap="gray")
elif activation_min !=-1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin=activation_min, cmap="gray")
else:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", cmap="gray")
plt.suptitle('Convolution activation layer ' + str(plt_num), size=16)
plt.savefig(OUT_DIR+'/visualize_image_CNN_'+str(plt_num)+'.png', bbox_inches="tight")
with tf.Session() as sess:
saver.restore(sess, OUT_DIR + '/check_points/final.ckpt')
### Feature map highlighter
ix = int(np.random.random() * X_test.shape[0])
random_image = np.expand_dims(X_test[ix], axis=0)
plt.figure(figsize=(3,3))
plt.imshow(X_test[ix])
plt.show()
resized_image = cv2.resize(X_test[ix], (50, 50))
cv2.imwrite(OUT_DIR+'/visualize_image.png', cv2.cvtColor(X_test[ix], cv2.COLOR_BGR2RGB))
outputFeatureMap(random_image, block1, plt_num=1)
#outputFeatureMap(random_image, block2, plt_num=2)